package drds.plus.executor.cursor.cursor.impl.sort;

import drds.plus.common.properties.ConnectionParams;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.ExecutorException;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.ITemporaryTableSortCursor;
import drds.plus.executor.cursor.cursor_factory.CursorFactory;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaData;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaDataImpl;
import drds.plus.executor.data_node_executor.DataNodeExecutorContext;
import drds.plus.executor.record_codec.RecordCodecFactory;
import drds.plus.executor.record_codec.record.KeyValueRecordPair;
import drds.plus.executor.record_codec.record.Record;
import drds.plus.executor.repository.Repository;
import drds.plus.executor.row_values.RowValues;
import drds.plus.executor.row_values.RowValuesWrapper;
import drds.plus.executor.table.ITemporaryTable;
import drds.plus.executor.utils.Utils;
import drds.plus.sql_process.abstract_syntax_tree.configuration.ColumnMetaData;
import drds.plus.sql_process.abstract_syntax_tree.configuration.IndexMapping;
import drds.plus.sql_process.abstract_syntax_tree.configuration.IndexType;
import drds.plus.sql_process.abstract_syntax_tree.configuration.TableMetaData;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Query;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.Item;
import drds.plus.sql_process.abstract_syntax_tree.expression.order_by.OrderBy;
import drds.plus.sql_process.type.Type;
import drds.plus.util.GeneralUtil;
import drds.tools.$;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

/**
 * 用于临时表排序，需要依赖bdb
 */
@Slf4j
public class TemporaryTableCursor extends SortCursor implements ITemporaryTableSortCursor {

    private static final String _temporary_table_ = "_temporary_table_";
    private static final String identity = "__identity__";
    private final long requestId;
    protected long sizeProtection = 102400;
    protected CursorFactory cursorFactory;
    protected ISortingCursor targetCursor = null;
    protected CursorMetaData cursorMetaData = null;
    boolean sortedDuplicates;
    ITemporaryTable temporaryTable = null;
    ExecuteContext executeContext = null;
    /**
     * 是否阶段超出行数的数据
     */
    boolean cutRows = false;
    private boolean inited = false;
    private Repository repository;
    private Query query = null;

    /**
     * @param executeContext
     * @param requestId
     * @param cursor
     * @param orderByList      按照何列排序
     * @param sortedDuplicates 是否允许重复
     * @throws RuntimeException
     */
    public TemporaryTableCursor(ExecuteContext executeContext, Repository repository, long requestId, ISortingCursor cursor, List<OrderBy> orderByList, boolean sortedDuplicates) throws RuntimeException {
        super(cursor, orderByList);
        this.executeContext = executeContext;
        this.repository = repository;
        this.sortedDuplicates = sortedDuplicates;
        targetCursor = cursor;
        this.requestId = requestId;
        sizeProtection = executeContext.getParametersManager().getLong(ConnectionParams.TEMP_TABLE_MAX_ROWS);
        cutRows = executeContext.getParametersManager().getBoolean(ConnectionParams.TEMP_TABLE_CUT_ROWS);
    }

    public TemporaryTableCursor(ExecuteContext executeContext, Repository repository, long requestId, ISortingCursor cursor, boolean sortedDuplicates, List<ColumnMetaData> keyColumns) throws RuntimeException {
        this(executeContext, repository, requestId, cursor, Utils.getOrderByListFromColumnMetaDataList(keyColumns), sortedDuplicates);
    }

    public TemporaryTableCursor(ExecuteContext executeContext, Repository repository, long requestId, ISortingCursor cursor, boolean sortedDuplicates) throws RuntimeException {
        this(executeContext, repository, requestId, cursor, Utils.getOrderByListFromColumnMetaDataList(null), sortedDuplicates);
    }

    public TemporaryTableCursor(ExecuteContext executeContext, Repository repository, long requestId, Query query, boolean sortedDuplicates) throws RuntimeException {
        this(executeContext, repository, requestId, null, Utils.getOrderByListFromColumnMetaDataList(null), sortedDuplicates);
        this.query = query;
    }

    public void init() throws RuntimeException {
        if (!inited) {
            prepare();
            inited = true;
        }
    }

    protected void prepare() throws RuntimeException {
        final String tableName = "temporary_table." + "requestId." + requestId;
        ISortingCursor cursor = null;
        try {
            temporaryTable = executeContext.getTableNameToTemporaryTableCache().get(tableName, new Callable<ITemporaryTable>() {
                public ITemporaryTable call() throws Exception {
                    return createTemporaryTable(tableName);
                }
            });
            ExecuteContext executeContext = new ExecuteContext();
            cursor = temporaryTable.getCursor(executeContext);//根据原有的游标的行数据先导入到bdb，然后查询出来生成的游标结合+排序信息构建新的游标对象
            List<RuntimeException> exs = new ArrayList();
            if (this.cursor != null) {
                exs = this.cursor.close(exs);
            }
            this.cursor = cursor;
            if (!exs.isEmpty()) {
                throw exs.get(0);
            }
        } catch (Exception ex) {
            List<RuntimeException> exs = new ArrayList();
            exs.add(new RuntimeException(ex));
            exs = this.close(exs);
            if (!exs.isEmpty()) {
                throw exs.get(0);
            }
        }
        cursorMetaData = temporaryTable.getCursorMetaData();
    }

    ITemporaryTable createTemporaryTable(String tableName) throws RuntimeException {
        //用来生成临时表信息的列
        List<ColumnMetaData> keyColumnMetaDataList = new ArrayList<ColumnMetaData>();
        List<ColumnMetaData> valueColumnMetaDataList = new ArrayList<ColumnMetaData>();
        // 用来生成CursorMeta的列
        List<ColumnMetaData> cursorMetaDataKeyColumnMetaDataList = new ArrayList<ColumnMetaData>();
        List<ColumnMetaData> cursorMetaDataValueColumnMetaDataList = new ArrayList<ColumnMetaData>();
        //
        if (cursor == null) {
            if (query == null) {
                throw new IllegalArgumentException("临时表即没给cursor，又没给执行计划");
            }
            cursor = DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().execute(executeContext, query);
        }
        //
        RowValues rowData = cursor.next();
        if (rowData != null) {
            // 如果没值返回空 ，什么都不做
            return null;
        }
        /**
         * 没有指定排序列，则以第一个列为主键
         */
        if (!$.isNotNullAndHasElement(orderByList)) {
            List<ColumnMetaData> columnMetaDataList = new ArrayList(0);
            columnMetaDataList.add(rowData.getParentCursorMetaData().getColumnMetaDataList().get(0));
            this.setOrderByList(Utils.getOrderByListFromColumnMetaDataList(columnMetaDataList));
        }
        // 遍历cursor的keyColumn，如果某个列是order by的条件，那么放到temp cursor的key里面，否则放到value里面
        // 希望通过kv中的列来构造meta data，因为底层讲avg(pk)，解释成了count，和 sum
        build(rowData, orderByList,//
                keyColumnMetaDataList, valueColumnMetaDataList, //
                cursorMetaDataKeyColumnMetaDataList, cursorMetaDataValueColumnMetaDataList);
        IndexMapping indexMapping = new IndexMapping(tableName, keyColumnMetaDataList, valueColumnMetaDataList, IndexType.btree, true, true);
        TableMetaData tableMetaData = new TableMetaData(tableName, new ArrayList(), indexMapping, null);
        tableMetaData.setTemporaryTable(true);
        tableMetaData.setSortedDuplicates(sortedDuplicates);
        // 增加临时表的判定
        ITemporaryTable temporaryTable = repository.getTemporaryTable(tableMetaData);
        //
        Record keyRecord = RecordCodecFactory.newRecordCodec(keyColumnMetaDataList).newRecord();
        Record valueRecord = null;
        if ($.isNotNullAndHasElement(valueColumnMetaDataList)) {
            valueRecord = RecordCodecFactory.newRecordCodec(valueColumnMetaDataList).newRecord();
        }
        //
        long size = 0;
        boolean protection = false;
        int identityNumber = 0;
        // 将数据插入BDB临时表中。
        do {
            size++;
            if (size > sizeProtection) {
                protection = true;
                break;
            }
            for (ColumnMetaData columnMetaData : keyColumnMetaDataList) {
                String columnName = columnMetaData.getColumnName();
                if (columnName.contains(_temporary_table_)) {
                    int index = columnName.indexOf(_temporary_table_);
                    columnName = columnName.substring(index + _temporary_table_.length());
                }
                Object value = Utils.getValue(rowData, columnMetaData.getTableName(), columnName, null);//表名+列名定位一行真实的数据
                keyRecord.put(columnMetaData.getColumnName(), value);
            }
            if (valueColumnMetaDataList != null && valueColumnMetaDataList.size() != 0) {
                for (ColumnMetaData columnMetaData : valueColumnMetaDataList) {
                    String columnName = columnMetaData.getColumnName();
                    if (identity.equals(columnName)) {
                        continue;
                    }
                    if (columnName.contains(_temporary_table_)) {
                        int index = columnName.indexOf(_temporary_table_);
                        columnName = columnName.substring(index + _temporary_table_.length());
                    }
                    Object value = Utils.getValue(rowData, columnMetaData.getTableName(), columnName, null);//表名+列名定位一行真实的数据
                    valueRecord.put(columnMetaData.getColumnName(), value);
                }
            }
            // value内加入唯一索引key。
            if (sortedDuplicates) {
                valueRecord.put(identity, identityNumber++);
            }
            temporaryTable.put(this.executeContext, tableName, indexMapping, keyRecord, valueRecord);//写入key->value key是查询条件
        } while ((rowData = cursor.next()) != null);
        if (protection && !cutRows) {
            throw new ExecutorException("temp where size protection , check your chars or enlarge the limination size . ");
        }
        List<ColumnMetaData> columnMetaDataList = new ArrayList<ColumnMetaData>();
        columnMetaDataList.addAll(cursorMetaDataKeyColumnMetaDataList);
        columnMetaDataList.addAll(cursorMetaDataValueColumnMetaDataList);
        temporaryTable.setCursorMetaData(CursorMetaDataImpl.buildCursorMetaData(columnMetaDataList));
        return temporaryTable;
    }

    /**
     * 根据行数据对象的列信息和排序信息 构建临时表需要的信息
     */
    private void build(RowValues rowData, List<OrderBy> orderByList, //
                       List<ColumnMetaData> keyColumnMetaDataList, List<ColumnMetaData> valueColumnMetaDataList, //
                       List<ColumnMetaData> cursorMetaDataKeyColumnMetaDataList, List<ColumnMetaData> cursorMetaDataValueColumnMetaDataList) {
        CursorMetaData cursorMetaData = rowData.getParentCursorMetaData();
        List<ColumnMetaData> columnMetaDataList = cursorMetaData.getColumnMetaDataList();
        //
        for (ColumnMetaData columnMetaData : columnMetaDataList) {
            /**
             * 为了防止有列名相同的列，新建一个列，列名由原表名和列名组成
             */
            ColumnMetaData columnMetaDataInTemporaryTable = new ColumnMetaData(//
                    columnMetaData.getTableName(),//
                    columnMetaData.getTableName() + _temporary_table_ + columnMetaData.getColumnName(), //
                    columnMetaData.getAlias(), columnMetaData.getType(), //
                    //
                    columnMetaData.isNullable());//
            OrderBy orderBy = findOrderByByColumnMetaData(orderByList, columnMetaData);
            if (orderBy != null) {
                keyColumnMetaDataList.add(columnMetaDataInTemporaryTable);//如果当前列在orderBy list中则添加到keyColumnMetaDataList
                if (!cursorMetaDataKeyColumnMetaDataList.contains(columnMetaData)) {
                    cursorMetaDataKeyColumnMetaDataList.add(columnMetaData);
                }
            } else {
                // 列名与order by not ruleCalculate ,放到value里
                if (!valueColumnMetaDataList.contains(columnMetaDataInTemporaryTable)) {
                    valueColumnMetaDataList.add(columnMetaDataInTemporaryTable);
                }
                if (!cursorMetaDataValueColumnMetaDataList.contains(columnMetaData)) {
                    cursorMetaDataValueColumnMetaDataList.add(columnMetaData);
                }
            }
        }

        if (keyColumnMetaDataList.size() < orderByList.size()) {
            // order by 的列不存在与cursor中，不可能吧
            throw new RuntimeException("should not be here");
        }
        // 是否针对重复的value进行排序
        if (sortedDuplicates) {
            valueColumnMetaDataList.add(new ColumnMetaData(keyColumnMetaDataList.get(0).getTableName(), identity, null, Type.IntegerType, true));
        }
    }

    private OrderBy findOrderByByColumnMetaData(List<OrderBy> orderByList, ColumnMetaData columnMetaData) {
        for (OrderBy orderBy : orderByList) {
            Item item = orderBy.getItem();
            String tableName = item.getTableName();
            tableName = Utils.getTableName(tableName);
            if (columnMetaData != null && Utils.getTableName(columnMetaData.getTableName()).equals(tableName)) {
                if (columnMetaData.getColumnName().equals(item.getColumnName())) {
                    return orderBy;
                }
            }
        }
        return null;
    }

    public CursorFactory getCursorFactory() {
        return cursorFactory;
    }

    public RowValues next() throws RuntimeException {
        init();
        RowValues next = parentCursorNext();
        next = wrap(next);
        return next;
    }

    private RowValues wrap(RowValues next) {
        if (next != null) {
            next = new RowValuesWrapper(cursorMetaData, next);
        }
        return next;
    }

    public boolean skipTo(Record keyRecord) throws RuntimeException {
        init();
        return parentCursorSkipTo(keyRecord);
    }

    public boolean skipTo(KeyValueRecordPair keyKeyValueRecordPair) throws RuntimeException {
        init();
        return parentCursorSkipTo(keyKeyValueRecordPair);
    }

    public void beforeFirst() throws RuntimeException {
        init();
        parentCursorBeforeFirst();
    }

    public RowValues current() throws RuntimeException {
        init();
        RowValues current = parentCursorCurrent();
        current = wrap(current);
        return current;
    }

    public RowValues first() throws RuntimeException {
        init();
        RowValues first = parentCursorFirst();
        first = wrap(first);
        return first;
    }

    public RowValues last() throws RuntimeException {
        init();
        RowValues last = parentCursorPrev();
        last = wrap(last);
        return last;
    }

    public RowValues prev() throws RuntimeException {
        init();
        RowValues prev = parentCursorPrev();
        prev = wrap(prev);
        return prev;
    }

    public List<RuntimeException> close(List<RuntimeException> exs) {
        exs = parentCursorClose(exs);

        return exs;
    }

    public String toString() {
        return toStringWithInden(0);
    }

    public String toStringWithInden(int inden) {
        String tabTittle = GeneralUtil.getTab(inden);
        String tabContent = GeneralUtil.getTab(inden + 1);
        StringBuilder sb = new StringBuilder();

        sb.append(tabTittle).append("TemporaryTableCursor").append("\n");
        GeneralUtil.printAFieldToStringBuilder(sb, "addOrderByItemAndSetNeedBuild", this.orderByList, tabContent);
        if (this.cursor != null) {
            sb.append(this.targetCursor.toStringWithInden(inden + 1));
        }
        return sb.toString();
    }

    public List<ColumnMetaData> getColumnMetaDataList() throws RuntimeException {
        init();
        return this.cursorMetaData.getColumnMetaDataList();
    }
}
